// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include <nebula/core/chunked_array.h>  // IWYU pragma: keep
#include <nebula/core/record_batch.h>
#include <turbo/utility/status.h>
#include <nebula/types/type.h>
#include <nebula/types/type_fwd.h>
#include <turbo/base/macros.h>

namespace nebula {

    class Array;

    class ChunkedArray;

    class KeyValueMetadata;

    class MemoryPool;

    /// \class Table
    /// \brief Logical table as sequence of chunked arrays
    class TURBO_EXPORT Table {
    public:
        virtual ~Table() = default;

        /// \brief Construct a Table from schema and columns
        ///
        /// If columns is zero-length, the table's number of rows is zero
        ///
        /// \param[in] schema The table schema (column types)
        /// \param[in] columns The table's columns as chunked arrays
        /// \param[in] num_rows number of rows in table, -1 (default) to infer from columns
        static std::shared_ptr<Table> create(std::shared_ptr<Schema> schema,
                                           std::vector<std::shared_ptr<ChunkedArray>> columns,
                                           int64_t num_rows = -1);

        /// \brief Construct a Table from schema and arrays
        ///
        /// \param[in] schema The table schema (column types)
        /// \param[in] arrays The table's columns as arrays
        /// \param[in] num_rows number of rows in table, -1 (default) to infer from columns
        static std::shared_ptr<Table> create(std::shared_ptr<Schema> schema,
                                           const std::vector<std::shared_ptr<Array>> &arrays,
                                           int64_t num_rows = -1);

        /// \brief create an empty Table of a given schema
        ///
        /// The output Table will be created with a single empty chunk per column.
        ///
        /// \param[in] schema the schema of the empty Table
        /// \param[in] pool the memory pool to allocate memory from
        /// \return the resulting Table
        static turbo::Result<std::shared_ptr<Table>> make_empty(
                std::shared_ptr<Schema> schema, MemoryPool *pool = default_memory_pool());

        /// \brief Construct a Table from a RecordBatchReader.
        ///
        /// \param[in] reader the nebula::RecordBatchReader that produces batches
        static turbo::Result<std::shared_ptr<Table>> from_record_batch_reader(RecordBatchReader *reader);

        /// \brief Construct a Table from RecordBatches, using schema supplied by the first
        /// RecordBatch.
        ///
        /// \param[in] batches a std::vector of record batches
        static turbo::Result<std::shared_ptr<Table>> from_record_batches(
                const std::vector<std::shared_ptr<RecordBatch>> &batches);

        /// \brief Construct a Table from RecordBatches, using supplied schema. There may be
        /// zero record batches
        ///
        /// \param[in] schema the nebula::Schema for each batch
        /// \param[in] batches a std::vector of record batches
        static turbo::Result<std::shared_ptr<Table>> from_record_batches(
                std::shared_ptr<Schema> schema,
                const std::vector<std::shared_ptr<RecordBatch>> &batches);

        /// \brief Construct a Table from a chunked StructArray. One column will be produced
        /// for each field of the StructArray.
        ///
        /// \param[in] array a chunked StructArray
        static turbo::Result<std::shared_ptr<Table>> from_chunked_struct_array(
                const std::shared_ptr<ChunkedArray> &array);

        /// \brief Return the table schema
        const std::shared_ptr<Schema> &schema() const { return schema_; }

        /// \brief Return a column by index
        virtual std::shared_ptr<ChunkedArray> column(int i) const = 0;

        /// \brief Return vector of all columns for table
        virtual const std::vector<std::shared_ptr<ChunkedArray>> &columns() const = 0;

        /// Return a column's field by index
        std::shared_ptr<Field> field(int i) const { return schema_->field(i); }

        /// \brief Return vector of all fields for table
        std::vector<std::shared_ptr<Field>> fields() const;

        /// \brief Construct a zero-copy slice of the table with the
        /// indicated offset and length
        ///
        /// \param[in] offset the index of the first row in the constructed
        /// slice
        /// \param[in] length the number of rows of the slice. If there are not enough
        /// rows in the table, the length will be adjusted accordingly
        ///
        /// \return a new object wrapped in std::shared_ptr<Table>
        virtual std::shared_ptr<Table> slice(int64_t offset, int64_t length) const = 0;

        /// \brief slice from first row at offset until end of the table
        std::shared_ptr<Table> slice(int64_t offset) const { return slice(offset, num_rows_); }

        /// \brief Return a column by name
        /// \param[in] name field name
        /// \return an Array or null if no field was found
        std::shared_ptr<ChunkedArray> get_column_by_name(const std::string &name) const {
            auto i = schema_->get_field_index(name);
            return i == -1 ? nullptr : column(i);
        }

        /// \brief Remove column from the table, producing a new Table
        virtual turbo::Result<std::shared_ptr<Table>> remove_column(int i) const = 0;

        /// \brief Add column to the table, producing a new Table
        virtual turbo::Result<std::shared_ptr<Table>> add_column(
                int i, std::shared_ptr<Field> field_arg,
                std::shared_ptr<ChunkedArray> column) const = 0;

        /// \brief Replace a column in the table, producing a new Table
        virtual turbo::Result<std::shared_ptr<Table>> set_column(
                int i, std::shared_ptr<Field> field_arg,
                std::shared_ptr<ChunkedArray> column) const = 0;

        /// \brief Return names of all columns
        std::vector<std::string> column_names() const;

        /// \brief Rename columns with provided names
        turbo::Result<std::shared_ptr<Table>> rename_columns(
                const std::vector<std::string> &names) const;

        /// \brief Return new table with specified columns
        turbo::Result<std::shared_ptr<Table>> select_columns(const std::vector<int> &indices) const;

        /// \brief Replace schema key-value metadata with new metadata
        /// \since 0.5.0
        ///
        /// \param[in] metadata new KeyValueMetadata
        /// \return new Table
        virtual std::shared_ptr<Table> replace_schema_metadata(
                const std::shared_ptr<const KeyValueMetadata> &metadata) const = 0;

        /// \brief Flatten the table, producing a new Table.  Any column with a
        /// struct type will be flattened into multiple columns
        ///
        /// \param[in] pool The pool for buffer allocations, if any
        virtual turbo::Result<std::shared_ptr<Table>> Flatten(
                MemoryPool *pool = default_memory_pool()) const = 0;

        /// \return pretty_print representation suitable for debugging
        std::string to_string() const;

        /// \brief Perform cheap validation checks to determine obvious inconsistencies
        /// within the table's schema and internal data.
        ///
        /// This is O(k*m) where k is the total number of field descendents,
        /// and m is the number of chunks.
        ///
        /// \return turbo::Status
        virtual turbo::Status validate() const = 0;

        /// \brief Perform extensive validation checks to determine inconsistencies
        /// within the table's schema and internal data.
        ///
        /// This is O(k*n) where k is the total number of field descendents,
        /// and n is the number of rows.
        ///
        /// \return turbo::Status
        virtual turbo::Status validate_full() const = 0;

        /// \brief Return the number of columns in the table
        int num_columns() const { return schema_->num_fields(); }

        /// \brief Return the number of rows (equal to each column's logical length)
        int64_t num_rows() const { return num_rows_; }

        /// \brief Determine if tables are equal
        ///
        /// Two tables can be equal only if they have equal schemas.
        /// However, they may be equal even if they have different chunkings.
        bool equals(const Table &other, bool check_metadata = false) const;

        /// \brief Make a new table by combining the chunks this table has.
        ///
        /// All the underlying chunks in the ChunkedArray of each column are
        /// concatenated into zero or one chunk.
        ///
        /// \param[in] pool The pool for buffer allocations
        turbo::Result<std::shared_ptr<Table>> combine_chunks(
                MemoryPool *pool = default_memory_pool()) const;

        /// \brief Make a new record batch by combining the chunks this table has.
        ///
        /// All the underlying chunks in the ChunkedArray of each column are
        /// concatenated into a single chunk.
        ///
        /// \param[in] pool The pool for buffer allocations
        turbo::Result<std::shared_ptr<RecordBatch>> combine_chunks_to_batch(
                MemoryPool *pool = default_memory_pool()) const;

    protected:
        Table();

        std::shared_ptr<Schema> schema_;
        int64_t num_rows_;

    private:
        TURBO_DISALLOW_COPY_AND_ASSIGN(Table);
    };

    /// \brief Compute a stream of record batches from a (possibly chunked) Table
    ///
    /// The conversion is zero-copy: each record batch is a view over a slice
    /// of the table's columns.
    ///
    /// The table is expected to be valid prior to using it with the batch reader.
    class TURBO_EXPORT TableBatchReader : public RecordBatchReader {
    public:
        /// \brief Construct a TableBatchReader for the given table
        explicit TableBatchReader(const Table &table);

        explicit TableBatchReader(std::shared_ptr<Table> table);

        std::shared_ptr<Schema> schema() const override;

        turbo::Status read_next(std::shared_ptr<RecordBatch> *out) override;

        /// \brief Set the desired maximum number of rows for record batches
        ///
        /// The actual number of rows in each record batch may be smaller, depending
        /// on actual chunking characteristics of each table column.
        void set_chunk_size(int64_t chunksize);

    private:
        std::shared_ptr<Table> owned_table_;
        const Table &table_;
        std::vector<ChunkedArray *> column_data_;
        std::vector<int> chunk_numbers_;
        std::vector<int64_t> chunk_offsets_;
        int64_t absolute_row_position_;
        int64_t max_chunksize_;
    };

    /// \defgroup concat-tables concatenate_tables function.
    ///
    /// concatenate_tables function.
    /// @{

    /// \brief Controls the behavior of concatenate_tables().
    struct TURBO_EXPORT ConcatenateTablesOptions {
        /// If true, the schemas of the tables will be first unified with fields of
        /// the same name being merged, according to `field_merge_options`, then each
        /// table will be promoted to the unified schema before being concatenated.
        /// Otherwise, all tables should have the same schema. Each column in the output table
        /// is the result of concatenating the corresponding columns in all input tables.
        bool unify_schemas = false;

        /// options to control how fields are merged when unifying schemas
        ///
        /// This field will be ignored if unify_schemas is false
        Field::MergeOptions field_merge_options = Field::MergeOptions::defaults();

        static ConcatenateTablesOptions defaults() { return {}; }
    };

    /// \brief Construct a new table from multiple input tables.
    ///
    /// The new table is assembled from existing column chunks without copying,
    /// if schemas are identical. If schemas do not match exactly and
    /// unify_schemas is enabled in options (off by default), an attempt is
    /// made to unify them, and then column chunks are converted to their
    /// respective unified datatype, which will probably incur a copy.
    /// :func:`nebula::promote_table_to_schema` is used to unify schemas.
    ///
    /// Tables are concatenated in order they are provided in and the order of
    /// rows within tables will be preserved.
    ///
    /// \param[in] tables a std::vector of Tables to be concatenated
    /// \param[in] options specify how to unify schema of input tables
    /// \param[in] memory_pool MemoryPool to be used if null-filled arrays need to
    /// be created or if existing column chunks need to endure type conversion
    /// \return new Table

    TURBO_EXPORT
    turbo::Result<std::shared_ptr<Table>> concatenate_tables(
            const std::vector<std::shared_ptr<Table>> &tables,
            ConcatenateTablesOptions options = ConcatenateTablesOptions::defaults(),
            MemoryPool *memory_pool = default_memory_pool());

    namespace compute {
        class CastOptions;
    }

    /// \brief Promotes a table to conform to the given schema.
    ///
    /// If a field in the schema does not have a corresponding column in
    /// the table, a column of nulls will be added to the resulting table.
    /// If the corresponding column is of type Null, it will be promoted
    /// to the type specified by schema, with null values filled. The
    /// column will be casted to the type specified by the schema.
    ///
    /// Returns an error:
    /// - if the corresponding column's type is not compatible with the
    ///   schema.
    /// - if there is a column in the table that does not exist in the schema.
    /// - if the cast fails or casting would be required but is not available.
    ///
    /// \param[in] table the input Table
    /// \param[in] schema the target schema to promote to
    /// \param[in] pool The memory pool to be used if null-filled arrays need to
    /// be created.
    TURBO_EXPORT
    turbo::Result<std::shared_ptr<Table>> promote_table_to_schema(
            const std::shared_ptr<Table> &table, const std::shared_ptr<Schema> &schema,
            MemoryPool *pool = default_memory_pool());

    /// \brief Promotes a table to conform to the given schema.
    ///
    /// If a field in the schema does not have a corresponding column in
    /// the table, a column of nulls will be added to the resulting table.
    /// If the corresponding column is of type Null, it will be promoted
    /// to the type specified by schema, with null values filled. The column
    /// will be casted to the type specified by the schema.
    ///
    /// Returns an error:
    /// - if the corresponding column's type is not compatible with the
    ///   schema.
    /// - if there is a column in the table that does not exist in the schema.
    /// - if the cast fails or casting would be required but is not available.
    ///
    /// \param[in] table the input Table
    /// \param[in] schema the target schema to promote to
    /// \param[in] options The cast options to allow promotion of types
    /// \param[in] pool The memory pool to be used if null-filled arrays need to
    /// be created.
    TURBO_EXPORT
    turbo::Result<std::shared_ptr<Table>> promote_table_to_schema(
            const std::shared_ptr<Table> &table, const std::shared_ptr<Schema> &schema,
            const compute::CastOptions &options, MemoryPool *pool = default_memory_pool());

}  // namespace nebula
